#!/usr/bin/perl
#============================================================================
#
# mysql binlog to sql(binlog_format=row)
# this script contains binlog parse to sql and flashback(only dml)
# author: Succy<1459307744@qq.com>
# date: 2021-05-01
#
# ===========================================================================

use strict;
use warnings;
use Getopt::Long qw(:config bundling no_ignore_case);
use utf8;
use 5.016;
use DBI;

our $VERSION = "1.0.1";
my %opt = ();

GetOptions(
    \%opt,              'sql-type=s',      'only-dml',         't|tables=s',
    'f|binlog=s',       'help',            'start-datetime=s', 'stop-datetime=s',
    'start-position=i', 'stop-position=i', 'd|databases=s',    'u|user=s',
    'h|host=s',         'P|port=i',        'p|password:s',     'B|flashback',
    'v|version'
) || die usage();

sub usage {
    print <<_EOF_;
bin2sql Ver $VERSION, for Linux (x86_64).
Copyright (c) 2021 Succy.

MySQL binlog to SQL and flashback DML SQL tools.
Options:
    -h, --host=name             Get the binlog from server, default localhost.
    -u, --user=name             Connect to the remote server as username, default root.
    -P, --port=#                Port number to use for connection or 3306 for default to.
    -p, --password[=name]       Password to connect to remote server.
    -t, --tables=name           Export tables in table names, delimiter by comma.
    -d, --database=name         List entries for just this database (local log only).
    -B, --flashback             Is print flashback SQL, only DML could be flashback.
    --start-datetime=name       Start reading the binlog at first event having a datetime
                                equal or posterior to the argument; the argument must be
                                a date and time in the local time zone, in any format
                                accepted by the MySQL server for DATETIME and TIMESTAMP
                                types, for example: 2004-12-25 11:25:56 (you should
                                probably use quotes for your shell to set it properly).

    --start-position=#          Start reading the binlog at position N. Applies to the
                                first binlog passed on the command line.

    --stop-datetime=name        Stop reading the binlog at first event having a datetime
                                equal or posterior to the argument; the argument must be
                                a date and time in the local time zone, in any format
                                accepted by the MySQL server for DATETIME and TIMESTAMP
                                types, for example: 2004-12-25 11:25:56 (you should
                                probably use quotes for your shell to set it properly).

    --stop-position=#           Stop reading the binlog at position N. Applies to the
                                last binlog passed on the command line.

    --only-dml                  Only print dml sql, optional, default disabled.
    --sql-type                  Sql type you want to process, support INSERT, UPDATE, DELETE.
    -f, --binlog=name           Read from binlog file.
    -v, --version               Output version information and exit. 
    --help                      Print help message.
_EOF_

    exit;
}

# Print version infomation and exit.
if ( $opt{'v'} ) {
    print "bin2sql Ver $VERSION, for Linux (x86_64).\n";
    exit;
}

if (   $opt{'help'}
    or ( !defined $opt{'f'} || $opt{'f'} eq "" )
    or ( !defined $opt{'d'} || $opt{'d'} eq "" ) 
    )
{
    usage();
}

# db config
my $db_pass = "123456";
if ( defined $opt{'p'} ) {
    $db_pass = $opt{'p'};
    if ( $db_pass eq '' ) {
        print "Enter Password:";
        $db_pass = <STDIN>;
        chomp $db_pass;
    }
}

my $db_user   = $opt{'u'} && $opt{'u'} ne '' ? $opt{'u'} : "root";
my $db_host   = $opt{'h'} && $opt{'h'} ne '' ? $opt{'h'} : "localhost";
my $db_port   = $opt{'P'} ? $opt{'P'} : 3306;
my %tb_schema = ();

my $cmd = qq{mysqlbinlog -vv --base64-output=DECODE-ROWS --skip-gtids -R -u$db_user -h$db_host -P$db_port -p$db_pass};

$cmd .= " --start-position=" . $opt{'start-position'}        if $opt{'start-position'};
$cmd .= " --stop-position=" . $opt{'stop-position'}          if $opt{'stop-position'};
$cmd .= " --start-datetime='" . $opt{'start-datetime'} . "'" if $opt{'start-datetime'};
$cmd .= " --stop-datetime='" . $opt{'stop-datetime'} . "'"   if $opt{'stop-datetime'};
$cmd .= " --database=" . $opt{'d'}                           if $opt{'d'};
$cmd .= " " . $opt{'f'};

print "# use command: $cmd\n";
open( FILE, "$cmd |" ) || die "can not execute mysqlbinlog\n";

my @tables    = $opt{'t'} && $opt{'t'} ne ''               ? split( /,/, $opt{'t'} ) : ();
my @sql_types = $opt{'sql-type'} && $opt{'sql-type'} ne '' ? map lc, split( /,/, $opt{'sql-type'} ) : ();
my $only_dml  = $opt{'only-dml'}                           ? 1 : 0;

# 是否闪回
my $flashback = $opt{'B'} ? 1 : 0;

#print "@sql_types\n";
my $block       = "";
my $line        = "";
my $end_log_pos = 0;
my $matchfilter = 0;

# 从数据库中获取table的schema信息，赋值给全局变量。避免每条语句都执行，一次性执行获取完
# 如果有指定表，则获取指定的表的信息即可。
sub fetch_table_schema {
    my $dbh =
      DBI->connect( "DBI:mysql:database=information_schema;host=$db_host;port=$db_port", "$db_user", "$db_pass" )
      or die "Unable to connect: $DBI::errstr\n";
    my $sql = "select column_name, table_name from information_schema.columns where table_schema='$opt{'d'}'";
    $sql .= " and table_name in(" . ( join ',', map "'$_'", @tables ) . ")" if @tables;
    $sql .= " order by table_name asc, ordinal_position asc";

    my $sth = $dbh->prepare($sql);
    $sth->execute();
    while ( my @row = $sth->fetchrow_array() ) {
        my ( $col_name, $table_name ) = @row;
        if ( exists( $tb_schema{$table_name} ) ) {

            # 使用指针来节约空间
            my $p_arr = \@{ $tb_schema{$table_name} };
            push @$p_arr, $col_name;
            $tb_schema{$table_name} = $p_arr;
        }
        else {
            my @arr = ($col_name);
            $tb_schema{$table_name} = \@arr;
        }
    }
    $sth->finish();
    $dbh->disconnect();
}
&fetch_table_schema;

# 处理SQL文本块，由于binlog解析出来的SQL，当同一个事务受影响有多条的时候，
# SQL是按照“文本块”的形式出现，因此需要做切分处理
# todo Perl很强大，只是我水平不行，应该还有更优雅的写法
sub process_sql_block {
    my $sql_block = shift;
    my $flag      = 0;
    my $sql       = "";
    my @arr       = split( /\n/, $sql_block );
    while ( my ( $index, $elem ) = each @arr ) {
        if ( @arr == $index + 1 && $flag ) {
            $elem =~ s/^\s+|\s+$//g;
            $sql .= $elem;
            if ($flashback) {
                &print_flashback_sql($sql);
            }
            else {
                &print_normal_sql($sql);
            }
        }
        if ( $elem =~ /^\s*(update|delete|insert|create|drop).*/i ) {

            # 两种情况：1、flag为true 2、数组只有一条有效记录
            if ( $flag || @arr == 2 ) {
                if ( @arr == 2 ) {
                    $sql .= $elem;
                }
                if ($flashback) {
                    &print_flashback_sql($sql);
                }
                else {
                    &print_normal_sql($sql);
                }
                $flag = 0;
                $sql  = "";
            }
            $flag = 1;
            $sql .= $elem . "\n";
        }
        elsif ( $elem =~ /^#(.+)server .+end_log_pos (\d+) .*/ ) {
            print "#$1end_log_pos: $2\n";
        }
        else {
            $sql .= $elem . "\n";
        }
    }
}

# 打印正常的sql
sub print_normal_sql {
    my $sql = shift;

    # 按照换行符切割成一个数组，并且去掉前后空格
    my @sql_split = map {
        ( my $s = $_ ) =~ s/^\s+|\s+$//g;
        $s;
    } split /\n/, $sql;

    # update 语句需要将WHERE与SET翻转
    if ( $sql =~ /update `?([a-z_-]+)`?\.`?([a-z_]+)`?.*/i ) {
        my $schema  = $1;
        my $tb_name = $2;
        my @cols    = @{ $tb_schema{$tb_name} };

        @sql_split = map {
            ( my $x = $_ ) =~ s/@(\d+)=/`$cols[$1 - 1]`=/g;
            $x
        } @sql_split;
        my $where = join ' AND ', grep { /WHERE/ .. /SET/ and !/WHERE/ and !/SET/ } @sql_split;
        my $set   = join ', ',    grep { /SET/ .. /\Q$sql_split[$#sql_split]\E/ and !/SET/ } @sql_split;
        $sql = "UPDATE `$schema`.`$tb_name` SET $set WHERE $where;\n";
    }

    # delete语句，需要将WHERE后面的条件加上AND
    elsif ( $sql =~ /delete from `?([a-z_]+)`?\.`?([a-z_]+)`?.*/i ) {
        my $schema  = $1;
        my $tb_name = $2;
        my @cols    = @{ $tb_schema{$tb_name} };

        @sql_split = map {
            ( my $x = $_ ) =~ s/@(\d+)=/`$cols[$1 - 1]`=/g;
            $x
        } @sql_split;
        my $where = join ' AND ', grep { /WHERE/ .. /\Q$sql_split[$#sql_split]\E/ and !/WHERE/ } @sql_split;
        $sql = "DELETE FROM `$schema`.`$tb_name` WHERE $where;\n";
    }

    # insert语句，需要补充values
    elsif ( $sql =~ /insert into `?([a-z_-]+)`?\.`?([a-z_]+)`?\.*/i ) {
        my $schema  = $1;
        my $tb_name = $2;
        my @cols    = @{ $tb_schema{$tb_name} };

        @sql_split = map {
            ( my $x = $_ ) =~ s/@(\d+)=/`$cols[$1 - 1]`=/g;
            $x
        } @sql_split;
        my $values = join ', ', map {
            ( my $x = $_ ) =~ s/`.+`=//g;
            $x
        } grep { /SET/ .. /\Q$sql_split[$#sql_split]\E/ and !/SET/ } @sql_split;
        $sql = "INSERT INTO `$schema`.`$tb_name` VALUES($values);\n";
    }
    else {
        chomp $sql;
        $sql .= ";\n";
    }

    print "$sql";
}

# 打印闪回SQL语句。能够进行闪回的只有INSERT/UPDATE/DELETE
sub print_flashback_sql {
    my ($sql) = @_;

    # 按照换行符切割成一个数组，并且去掉前后空格
    my @sql_split = map {
        ( my $s = $_ ) =~ s/^\s+|\s+$//g;
        $s;
    } split /\n/, $sql;

    # update 语句需要将WHERE与SET翻转
    if ( $sql =~ /update `?([a-z_-]+)`?\.`?([a-z_]+)`?.*/i ) {
        my $schema  = $1;
        my $tb_name = $2;
        my @cols    = @{ $tb_schema{$tb_name} };

        @sql_split = map {
            ( my $x = $_ ) =~ s/@(\d+)=/`$cols[$1 - 1]`=/g;
            $x
        } @sql_split;
        my $set   = join ', ',    grep { /WHERE/ .. /SET/ and !/WHERE/ and !/SET/ } @sql_split;
        my $where = join ' AND ', grep { /SET/ .. /\Q$sql_split[$#sql_split]\E/ and !/SET/ } @sql_split;
        print "UPDATE `$schema`.`$tb_name` SET $set WHERE $where;\n";
    }

    # delete语句，需要将转换成insert语句
    elsif ( $sql =~ /delete from `?([a-z_]+)`?\.`?([a-z_]+)`?.*/i ) {
        my $schema  = $1;
        my $tb_name = $2;
        my @cols    = @{ $tb_schema{$tb_name} };

        @sql_split = map {
            ( my $x = $_ ) =~ s/@(\d+)=/`$cols[$1 - 1]`=/g;
            $x
        } @sql_split;
        my $values = join ', ', map {
            ( my $x = $_ ) =~ s/`.+`=//g;
            $x
        } grep { /WHERE/ .. /\Q$sql_split[$#sql_split]\E/ and !/WHERE/ } @sql_split;
        print "INSERT INTO `$schema`.`$tb_name` VALUES($values);\n";
    }

    # insert语句，需要翻转成delete语句
    elsif ( $sql =~ /insert into `?([a-z_-]+)`?\.`?([a-z_]+)`?\.*/i ) {
        my $schema  = $1;
        my $tb_name = $2;
        my @cols    = @{ $tb_schema{$tb_name} };

        @sql_split = map {
            ( my $x = $_ ) =~ s/@(\d+)=/`$cols[$1 - 1]`=/g;
            $x
        } @sql_split;
        my $where = join ' AND ', grep { /SET/ .. /\Q$sql_split[$#sql_split]\E/ and !/SET/ } @sql_split;
        print "DELETE FROM `$schema`.`$tb_name` WHERE $where;\n";
    }
    else {
        return;
    }
}

while ( $line = <FILE> ) {
    if ( $line ne "" ) {
        if (   $line =~ /^\/\*/
            || $line =~ /ROLLBACK/
            || $line =~ /DELIMITER/
            || $line =~ /SET TIMESTAMP=\d+.+/i
            || $line =~ /SET \@\@SESSION.GTID_NEXT.+/ )
        {
            #do nothing
        }
        elsif ( $line =~ /^#\d+.+end_log_pos (\d+) .*/ ) {

            #determin end_log_pos
            $end_log_pos = $1;
            $block .= $line;
        }
        elsif ( $line =~ /# at (\d+)/ ) {

            # $start_log_pos = $1;
            if ( ( $end_log_pos == $1 && $matchfilter ) ) {
                chomp($block);

                # 去除###
                $block =~ s/###\s*//g;

                # 去除后面的/*类型*/
                $block =~ s/\s*\/\*.*\*\///g;
                &process_sql_block($block);
            }

            #clean variables
            $block       = "";
            $end_log_pos = 0;
            $matchfilter = 0;
        }
        elsif ( $line =~ /^# End of log file/ ) {

            # 结束了，直接跳出循环
            if ($matchfilter) {

                # 去除###
                $block =~ s/###\s*//g;

                # 去除后面的/*类型*/
                $block =~ s/\s*\/\*.*\*\///g;
                chomp $block;
                &process_sql_block($block);
            }
            last;
        }
        else {
            # print $line;
            if ( $line =~ /^### update `[a-z_-]+`\.`?([a-z_]+)`?.*/i ) {
                $matchfilter = 1;

                #update
                if (@tables) {
                    $matchfilter = 0;
                    if ( $1 ~~ @tables ) {
                        $matchfilter = 1;
                    }
                }

                # 如果表已经被删除了，那么SQL就没办法解析出来了，直接忽略
                unless ( $1 ~~ %tb_schema ) {
                    $matchfilter = 0;
                }
                if ( $matchfilter == 1 ) {
                    if (@sql_types) {
                        $matchfilter = 0;
                        if ( 'update' ~~ @sql_types ) {
                            $matchfilter = 1;
                        }
                    }
                }
                if ( $matchfilter == 1 ) {
                    $block .= $line;
                }
            }
            elsif ( $line =~ /^### insert into `?[a-z-_]+`?\.`?([a-z_]+)`?\.*/i ) {
                $matchfilter = 1;

                #insert
                if (@tables) {
                    $matchfilter = 0;
                    if ( $1 ~~ @tables ) {
                        $matchfilter = 1;
                    }
                }

                # 如果表已经被删除了，那么SQL就没办法解析出来了，直接忽略
                unless ( $1 ~~ %tb_schema ) {
                    $matchfilter = 0;
                }
                if ( $matchfilter == 1 ) {
                    if (@sql_types) {
                        $matchfilter = 0;
                        if ( 'insert' ~~ @sql_types ) {
                            $matchfilter = 1;
                        }
                    }
                }
                if ( $matchfilter == 1 ) {
                    $block .= $line;
                }
            }
            elsif ( $line =~ /^### delete from `?[a-z_]+`?\.`?([a-z_]+)`?.*/i ) {
                $matchfilter = 1;

                #delete
                if (@tables) {
                    $matchfilter = 0;
                    if ( $1 ~~ @tables ) {
                        $matchfilter = 1;
                    }
                }

                # 如果表已经被删除了，那么SQL就没办法解析出来了，直接忽略
                unless ( $1 ~~ %tb_schema ) {
                    $matchfilter = 0;
                }
                if ( $matchfilter == 1 ) {
                    if (@sql_types) {
                        $matchfilter = 0;
                        if ( 'delete' ~~ @sql_types ) {
                            $matchfilter = 1;
                        }
                    }
                }

                if ( $matchfilter == 1 ) {
                    $block .= $line;
                }
            }
            elsif ($line =~ /^ *drop table if exists `?([a-z_]+)`?.*/i
                || $line =~ /^ *drop table `?([a-z_]+)`?.*/i
                || $line =~ /^ *truncate table `?([a-z_]+)`?.*/i
                || $line =~ /^ *alter table `?([a-z_]+)`? .+/i
                || $line =~ /^ *create table `?[a-z_-]+`?\.`?([a-z_]+)`? .+/i
                || $line =~ /^ *create table `?([a-z_]+)`? .+/i
                || $line =~ /^ *create (unique index|index) .+ on `?([a-z_]+)`? .+/i
                || $line =~ /^ *create view .+ from `?([a-z_]+)`? .+/i )
            {
                $matchfilter = 1;
                if (@tables) {
                    $matchfilter = 0;

                    if ( $1 ~~ @tables ) {
                        $matchfilter = 1;
                    }
                }
                if ( $matchfilter == 1 ) {
                    $matchfilter = $only_dml ? 0 : 1;
                }

                # sql_types不包含这些，只包含insert/update/delete
                if (@sql_types) {
                    $matchfilter = 0;
                }
                if ( $matchfilter == 1 ) {
                    $block .= $line;
                }
            }
            else {
                $block .= $line;
            }
        }
    }
}

# print "$opt{'t'}-$opt{'tables'}\n";
